dbtでSELECT文以外(DDL, DML,ストアドプロシージャ等)のクエリを移行・実装する手順 #dbt
アライアンス事業部 エンジニアグループ モダンデータスタック(MDS)チームのしんやです。
dbtでは良く、dbtの紹介をするときに『dbtではSELECT文が書ければ変換処理が書ける/作れる』的な説明をすることがあります。dbtのモデルに関する説明でも以下のような記載・言及があります。
A SQL model is a
select
statement. Models are defined in.sql
files (typically in yourmodels
directory):
(SQLモデルはselect文です。モデルは.sqlファイルで定義されます(通常はmodelsディレクトリにあります):)- Each
.sql
file contains one model /select
statement
(各 .sql ファイルには、1 つのモデル / select 文が含まれます。)
- The model name is inherited from the filename.
(モデル名はファイル名から継承されます。)
- We strongly recommend using underscores for model names, not dots. For example, usemodels/my_model.sql
instead ofmodels/my.model.sql
.
(モデル名にはドットではなくアンダースコアを使うことを強く推奨します。例えば、models/my.model.sqlの代わりにmodels/my_model.sqlを使います。)
- Models can be nested in subdirectories within themodels
directory.
(モデルはmodelsディレクトリ内のサブディレクトリに入れ子にすることができる。)
SQL models | dbt Developer Hub
『なるほど確かに、dbtではSELECT文でモデルを作り上げていくということは分かった。でもデータ分析、SQLで変換処理を書くときってSELECT文以外にも色々使うよね?それってどうするの?』と思ったのは私だけでは無いはずです。実際どういう風にその辺りを上手いこと対応させているのでしょうか。
という質問・疑問にdbtの公式ドキュメントで回答しているものがあります。「dbtでSELECT以外の処理をどういう形で実現させているのか」の一例となるこちらの内容について、当エントリで紹介していきたいと思います。
目次
- はじめに
- INSERT文のマッピング(dbtで動作するSQLへの置き換え作業)について
- UPDATE文のマッピング(dbtで動作するSQLへの置き換え作業)について
- DELETE文のマッピング(dbtで動作するSQLへの置き換え作業)について
- MERGE文のマッピング(dbtで動作するSQLへの置き換え作業)について
- ストアドプロシージャの移行作業について
- まとめ
はじめに
ドキュメント冒頭では、既存のデータ変換処理で実装していた処理をdbtで動くように置き換える上で、FROM/TOへの大幅なパラダイムシフトが伴うことは良くあることだ、という風に述べています。まさにこのエントリで対象としているトピック・テーマですね。
- [FROM] データセットを構築する手続き的なフロー(例えば、一連のDDLステートメントやDMLステートメント)
- [TO] データセットを定義する宣言的なアプローチ(例えば、dbtがデータモデルを表現するためにSELECTステートメントを使用する方法)
ちなみにここでタイトルとしても挙げている「DDL」「DML」「ストアドプロシージャ」とはそもそもどういったものなのか?という部分についても、簡単ではありますがdbtの関連ドキュメントから認識を合わせておきたいと思います。
- DDL(Data Manipulation Language):
- データベーステーブルまたはビューの行レベルのデータのクエリ、編集、追加、削除に使用されるSQLステートメント。
- 主なDMLステートメントは、SELECT、INSERT、DELETE、および UPDATE。
- データベースとその中のオブジェクトの構造を編集および操作するために使用できる一連のSQLステートメントであるデータ定義言語(DDL)と対比される。
- 参考: DML: The SQL statements that make the data world go 'round
- DML(Data Definition Language):
- テーブルやビューなどのデータベースオブジェクトを管理するために実行できるSQLステートメントのグループ。
- DDLステートメントを使用すると、オブジェクトの作成、変更、削除などの強力なコマンドをデータベース内で実行できる。
- 主なDDLステートメントは、ALTER、DROP、CREATE、TRUNCATE。
- DDLコマンドは通常、SQLブラウザまたはストアド プロシージャで実行される。
- 参考: What is Data Definition Language (DDL) in SQL?
- ストアド・プロシージャ(Stored Procedure):
- データウェアハウスの世界で広く使われている仕組み。複雑な変換をスケジューリング可能なユニットにカプセル化し、パラメータを介して条件ロジックに応答するのに適している。
- 参考: Migrating from Stored Procedures to dbt | dbt Developer Blog
移行の準備
まず前提として「DMLステートメントは元のテーブルに含まれる可能性のある列と、列の種類の包括的なセットを常に示す訳では無い」ということに注意してください。変換作業が同一であるかどうかを正確に把握するためにはテーブルを作成するためのDDLを把握する必要があります。
- 対象となるデータウェアハウスが
SHOW CREATE TABLE
をサポートしている場合、再作成する列の包括的なセットを簡単に取得できます。- Snowflakeの場合
- Google BigQueryの場合
- Amazon Redshiftの場合
- DDLを持っていないけど、実質的なストアドプロシージャを作成している場合、有効なアプローチの1つは、テーブルを変更する DMLステートメントから列リストを取得し、表示される列の完全なセットを構築するという方法が検討出来るでしょう。
dbtによってマテリアライズ(具体化)されたモデルは通常、オブジェクト作成のドライバーとしてCREATE TABLE AS SELECT
または CREATE VIEW AS SELECT
を使用します。このため、クエリが明示的でない場合、テーブルが意図しない列タイプになる可能性があります。
- 例えば、
INT
、DECIMAL
、NUMERIC
といったデータ型に関してそれぞれを識別させたい場合は明示的にその情報を示す必要があります。幸いなことに、dbtでこれを実現させたい場合であれば列を目的の型にキャストするだけで対応が行えます。
また、dbtにおいては一般的にこのような「列の名前変更と型キャストを行うSQL」はできるだけソーステーブルの近くで(通常はステージング変換のレイヤーに位置するモデルの内部で)実行することをお勧めします。これにより、将来のdbt Modelを扱う開発者が、これらの変換をどこで行えば(記述すれば)良いかを把握しやすくなります。全体的なプロジェクト構造の詳細については、下記のドキュメントをご参照ください。
当エントリで言及していくことになる「SQL処理のマッピング(既存SQL処理をdbtで動作するSQLに置き換える作業)が必要なSQLは主に以下4つです。最後のMERGE文はちょっと複雑になりますが、dbtにおける様々な手法を用いることで効果的に処理の置き換えを行うことが出来ます。
INSERT
UPDATE
DELETE
MERGE
INSERT文のマッピング(dbtで動作するSQLへの置き換え作業)について
INSERT文の置き換え作業は、dbtを使用して既存のソースまたは他のdbtモデルからSELECT文をを実行する場合と機能的には同じです。
INSERT-SELECT
文に変換する最も簡単な方法は、新しいdbtモデルを作成し、INSERT文のSELECT部分を取り出してモデルに取り込むことです。 例えば以下のようなINSERT INTO〜SELECT文があった場合、
INSERT INTO returned_orders (order_id, order_date, total_return) SELECT order_id, order_date, total FROM orders WHERE type = 'return'
returned_orders.sql
というファイル名でSQLモデルを作成し、以下のSQLを記載します。
SELECT order_id as order_id, order_date as order_date, total as total_return FROM {{ ref('orders') }} WHERE type = 'return'
機能的には、この作業によって任意のデータ型を前提とした3つの列(order_id
、order_date
、total_return
) を含む returns_orders
と呼ばれるSQLモデル(ユーザーの要望に応じてテーブルまたはビューとしてマテリアライズ/具体化の指定が可能)が作成されます。 dbtを使用して、宣言的な方法でINSERTと同じ目的を達成出来ました。
FROM句に関する注意
dbtによるモデル開発では、FROM句でハードコーディングされたテーブル名またはビュー名を使用することは、新規ユーザーが犯す最も重大な間違いの1つです。
dbtではrefマクロとsourceマクロというとても便利な機能を活用することが出来ます。これらを使うとdbtは変換を実行する必要がある順序を検出し、dbtの組み込みリネージ生成とパイプライン実行の恩恵を受けることができるようになります。
- About ref function | dbt Developer Hub
- About source function | dbt Developer Hub
- What is data lineage? And how do you get started?
(※sourceマクロ、refマクロを適切に活用することにより、以下のようなリネージグラフを自動的にdbtが生成、テーブル間参照や依存関係をわかりやすく表示してくれるようになります)
この記事の残りの部分のサンプル コードでは、SQL ステートメントの dbt 変換バージョンで ref ステートメントを使用しますが、これは読者がそれらのモデルが dbt プロジェクトに存在することを確認するための演習です。
既存のテーブルへの連続INSERTはUNION ALL
で結合可能
dbt モデルは単一のCREATE TABLE AS SELECT
(ステップに分ける場合は、CREATE
からのINSERT
)を効果的に実行するため、変換内にデータを同じテーブルに挿入する複数の INSERT文がある場合、複雑になる可能性があります。
このような場合、dbtでは幸いなことにUNION ALL
を活用することで簡単に処理できます。例えば元ネタとしてのクエリで以下のようなものがあるとします。
CREATE TABLE all_customers INSERT INTO all_customers SELECT * FROM us_customers INSERT INTO all_customers SELECT * FROM eu_customers
これをdbtで動作するクエリに置き換えると以下のような内容となります:
SELECT * FROM {{ ref('us_customers') }} UNION ALL SELECT * FROM {{ ref('eu_customers') }}
ロジックは機能的に同等なので、既に作成したモデルにINSERTする別のSELECT文がある場合は、そのロジックを最初のSELECT文とUNION ALLしただけの 2番目のSELECT文に追加するだけです。
UPDATE文のマッピング(dbtで動作するSQLへの置き換え作業)について
UPDATE
を変換するときに行う思考プロセスは、INSERT
の動作と非常に良く似ています。dbtモデルにおけるSELECT
リストのロジックは主にSET
セクションの内容から取得します。以下のようなUPDATE文があった場合、
UPDATE orders SET type = 'return' WHERE total < 0
内容を確認する方法としては、INSERT
-SELECT
文で行ったときのものと似ています。 更新中のテーブルは変更したいモデルであり、行う処理は「UPDATE文」であるため、対象となるモデルは既に作成されている可能性が高く、次のいずれかを行うことができます。
- 後続の変換でそれらに情報を追加
- 元のモデルから構築された中間モデルを作成 - おそらくは
int_[entity]_[verb].sql
のような名前を付ける形で
SELECT
文のリストにはテーブルのすべての列が含まれている必要がありますが、DMLによって更新される特定の列については、等号の右側の計算をSELECTされた値として使用します。 その後、等号の左側にあるターゲット列名を列の別名として使用できます。上記のクエリから中間変換を構築している場合、次のような内容に変換されます。
SELECT CASE WHEN total < 0 THEN 'return' ELSE type END AS type, order_id, order_date FROM {{ ref('stg_orders') }}
UPDATE
文は指定した列の全ての値を変更するわけではないため、CASEステートメントを使用してコンテンツのWHERE句を適用します。 そして最終的にターゲットテーブルに含める全ての列を選択する必要があります。 対象列のを省略してしまうと、dbtの宣言的アプローチにより、その列はターゲットテーブルにまったく渡されなくなるので注意が必要です。
場合によっては、テーブル内のすべての列が何であるかがわからない場合や、上記のような状況では、テーブル内の列の合計数に対して少数の列のみを変更しているだけである場合があります。 テーブル内のすべての列をリストするのは面倒な場合がありますが、幸いなことにdbtには、テーブルの完全な列リストをリストするのに役立ついくつかの便利なユーティリティマクロが含まれています。モデルをもう少し動的に記述する別の方法は以下の通りです。
SELECT {{ dbt_utils.star(from=ref('stg_orders'), except=['type']) }}, CASE WHEN total < 0 THEN 'return' ELSE type END AS type, FROM {{ ref('stg_orders') }}
dbt_utils.star()
マクロはテーブル内の列の完全なリストを出力しますが、除外リストにリストした列はスキップします。これにより、より少ないコード行を記述しながら同じロジックを実行できます。 これは、dbt マクロを使用してコードを簡素化し、短縮する簡単な例です。より多くのテクニックを学ぶことで、dbtで記述するクエリを更に洗練させることも出来るようになります。詳細については、dbt_utilsパッケージとstarマクロの詳細を参照してください。
DELETE文のマッピング(dbtで動作するSQLへの置き換え作業)について
「既存のSQLによる変換処理」と「dbtがデータをモデル化する際の方法」の最大の違いの1つは、『一般的にdbtがデータを破壊しない』という点です。(この記事では言及しませんが)dbt上でDELETE
文を実行する方法は無いわけではないですが、dbt上でデータ削除処理を行うベストプラクティスは論理的な削除のみを使用し、最終的な変換で論理的に削除されたデータをフィルターで除外することです。
以下のようなDELETE文があるとします。
DELETE FROM stg_orders WHERE order_status IS NULL
dbtモデルでは、まず削除すべきレコードを特定し、それらをフィルタリングする必要があります。このクエリの主な翻訳方法は2つあります:
SELECT * FROM {{ ref('stg_orders') }} WHERE order_status IS NOT NULL
1つ目のアプローチは、DELETEのロジックを反転して、削除する必要があるレコードのセットではなく、残すべきレコードのセットを記述するだけです。 これは、dbtがデータセットを宣言的に記述する方法に関係しており、データセット内に存在する必要があるデータを参照すると、そのデータセットを使用してテーブルまたはビューが作成されます。
もう1つのアプローチは、削除されたレコードをマークし、それらをフィルターで除外することです。 例えば:
WITH soft_deletes AS ( SELECT *, CASE WHEN order_status IS NULL THEN true ELSE false END AS to_delete FROM {{ ref('stg_orders') }} ) SELECT * FROM soft_deletes WHERE to_delete = false
この方法の場合、削除されたすべてのレコードにフラグが付けられ、最後のSELECT
で削除されたデータが除外されるため、結果のテーブルには残りのレコードのみが含まれます。 これは、単にDELETE
ロジックを反転するよりもはるかに冗長ですが、複雑なDELETEロジックの場合、これは履歴コンテキストを保持したDELETEを実行する非常に効果的な方法となります。
ちなみにハード削除(hard delete)は幾つかの方法で実行が可能です。(ここでは情報参照のみに留めておきます)
- 最も一般的なのは、
run-operation
としてdbtマクロを実行する方法です。 - また、
post-hook
の仕組みを使用して、削除対象のレコードがマークされた後にDELETE
文を実行することも出来ます。
MERGE文のマッピング(dbtで動作するSQLへの置き換え作業)について
dbtにはマテリアライゼーション(materialization)と呼ばれる概念があり、ウェアハウス内でモデルが物理的または論理的にどのように表現されるかを決定しています。
INSERT
、UPDATE
、DELETE
は通常、テーブルまたはビューの実体化を使用して実行されます。MERGE
やUPSERT
などのコマンドによって実行される増分ワークロードの場合、dbt にはインクリメンタル(incremental/増分)と呼ばれる特定の具体化があります。- 増分マテリアライゼーションは、実行のたびにテーブル全体を最初から再作成することなく、テーブルへの増分ロードと更新を処理するために特に使用されます。
Step 1: MERGE文をINSERT/UPDATE文のようにマッピングして行く
まずはロジックの変換から。MERGE
のロジックを抽出し、INSERT
またはUPDATE
と同じように処理するのが、MERGEコマンドの移行を開始する最も簡単な方法です。ロジック変換がどのように機能するかを確認するために、MERGEの例から始めます。このシナリオでは、乗り物が毎日詳細テーブルに読み込まれ、ヒントは後日更新される可能性があるため、最新の状態に保つ必要がある配車アプリを想定しています。
MERGE INTO ride_details USING ( SELECT ride_id, subtotal, tip FROM rides_to_load AS rtl ON ride_details.ride_id = rtl.ride_id WHEN MATCHED THEN UPDATE SET ride_details.tip = rtl.tip WHEN NOT MATCHED THEN INSERT (ride_id, subtotal, tip) VALUES (rtl.ride_id, rtl.subtotal, NVL(rtl.tip, 0, rtl.tip) );
USING
句の内容は、match ステートメントを処理する開始点として CTE(Common Table Expressions/共通テーブル式)に簡単に配置できるため、便利なコードです。これを分割する最も簡単な方法は、各match ステートメントを、前のmatchステートメントに基づいて構築された個別のCTEとして扱うことです。
- MERGE | Snowflake Documentation
- Data manipulation language (DML) statements in GoogleSQL | BigQuery | Google Cloud
- MERGE - Amazon Redshift
ON
句は、これをインクリメンタルに変える準備ができた時点でのみ有効となるため、今のところ無視できます。
UPDATE
やINSERT
と同様に、SELECT
リストとエイリアスを使用してターゲットテーブルに適切な列に名前を付けることができ、UNION
を組み合わせてINSERT
文を使用できます(重複を避けるためにUNION ALL
ではなくUNION
を使用するように注意してください)。
MERGE は最終的に次のように変換されます。
WITH using_clause AS ( SELECT ride_id, subtotal, tip FROM {{ ref('rides_to_load') }} ), updates AS ( SELECT ride_id, subtotal, tip FROM using_clause ), inserts AS ( SELECT ride_id, subtotal, NVL(tip, 0, tip) FROM using_clause ) SELECT * FROM updates UNION inserts
この時点ではまだ置き換え処理は完了していません。ここでのロジックは MERGEに似ていますが、更新と挿入のCTEはどちらも同じソースクエリから選択しているため、実際には同じことを行いません。増分マテリアライゼーションに移行する際には、個別のデータセットを確実に取得する必要があります。
重要な注意点の1つは、dbtがMATCHアクションとしてDELETE
をネイティブにサポートしていないことです。 MERGE
文にWHEN MATCHED THEN DELETE
を使用する行がある場合、それを更新のように扱い、論理的な削除フラグを追加して、後続の変換で除外することをお勧めします。
Step 2: 増分マテリアライゼーションへの変換
前述のように、増分マテリアライゼーションは、ターゲット テーブルが存在しない場合、標準のテーブルマテリアライゼーションとほぼ同じように機能し、CREATE TABLE AS SELECT
文を実行するという点で少し特殊です。 但し、ターゲットテーブルが存在する場合、マテリアライズでは代わりにMERGE
文が実行されます。
MERGE
ではUSING
句とターゲット テーブルの間にJOIN
条件が必要なため、レコードが一致をトリガーするかどうかをdbtが判断する方法を指定する方法が必要です。 その特定の情報は、dbtモデルに関するデル設定で指定されます。
次のconfig()
ブロックをモデルの先頭に追加して、段階的に構築する方法を指定できます。
{{ config( materialized='incremental', unique_key='ride_id', incremental_strategy='merge' ) }}
上記設定例における3つのフィールドに関する設定内容の解説は以下の通り。
materialized='incremental'
を設定すると、ターゲットテーブルにUPSERT
ロジックを適用するようにdbtに指示出来ます。unique_key
は、ターゲットテーブルの主キーである必要があります。 これは、レコードを既存のテーブルと照合するために使用されます。incremental_strategy
は、ターゲットテーブル内の既存の行を、受信したデータのバッチに一致するunique_keyの値とMERGEするように設定されています。 さまざまな状況や倉庫に応じてさまざまな増分戦略があります。
設定周りの内容については下記ドキュメントを参照。
モデルを増分マテリアライゼーションに変換する際の作業の大部分は、増分ロードと完全バックフィルまたは初期ロードに対してロジックをどのように変更するかを決定することにあります。 dbtは、特別なマクロis_incremental()
を提供します。これは、初期ロードまたはバックフィル (dbt用語では完全リフレッシュと呼ばれます) の場合はfalseと評価されますが、増分ロードの場合はtrueと評価されます。
このマクロを使用してモデルコードを拡張し、後続のロードでのデータのロード方法を調整できます。そのロジックをどのように追加するかは、データの受信方法によって少し異なりますが、一般的な方法としては次のようなものがあります。
- ソーステーブルは増分ロードの前に切り詰められ、その増分でロードされるデータのみが含まれる
- ソーステーブルにはすべての履歴データが含まれており、ロードされる新しいデータを識別するロード タイムスタンプ列がある
最初のケースでは、作業は基本的にすでに完了しています。 ソーステーブルには常にロードされる新しいデータのみが含まれるため、増分ロードのためにクエリを変更する必要はありません。
ただし、2番目のケースでは、ロジックを正しく処理するためにis_incremental()
マクロを使用する必要があります。
以前にまとめた変換後のMERGEステートメントを拡張して、次の追加ロジックを追加。
WITH using_clause AS ( SELECT ride_id, subtotal, tip, max(load_timestamp) as load_timestamp FROM {{ ref('rides_to_load') }} {% if is_incremental() %} WHERE load_timestamp > (SELECT max(load_timestamp) FROM {{ this }}) {% endif %} ), updates AS ( SELECT ride_id, subtotal, tip, load_timestamp FROM using_clause {% if is_incremental() %} WHERE ride_id IN (SELECT ride_id FROM {{ this }}) {% endif %} ), inserts AS ( SELECT ride_id, subtotal, NVL(tip, 0, tip), load_timestamp FROM using_clause WHERE ride_id NOT IN (SELECT ride_id FROM updates) ) SELECT * FROM updates UNION inserts
ここで理解すべき重要な概念がいくつかあります。
is_incremental()
条件ブロック内のコードは、このモデル ードの増分実行に対してのみ実行されます。 ターゲットテーブルが存在しない場合、または--full-refresh
オプションが使用されている場合、そのコードは実行されません。{{ this }}
はdbtの特別なキーワードで、Jinjaブロックで使用すると、コードが実行されているモデルを自己参照します。 したがって、my_incremental_model.sql
というファイルにモデルがある場合、{{ this }}
はmy_incremental_model
(必要に応じてデータベース名とスキーマ名で完全修飾)を参照します。 このキーワードを使用すると、ターゲット テーブルの現在の状態を利用してソース クエリに通知できます。
ストアドプロシージャの移行作業について
ここまでで紹介した内容は、ストアドプロシージャでよく見られる個々のDMLステートメントの変換を開始するのにも参考となります。従来の手続き型コードを、より読みやすく保守しやすく、DRY 原則などのソフトウェアエンジニアリングのベストプラクティスの恩恵を受けるdbtモデルに迅速に移行できます。 更に、記載・設定した変換内容がdbtモデルとして実行されると、下流で使用されるデータが高品質で信頼できるものであることを確認するための『変換に対するテスト』が非常に簡単になります。
ページ・エントリとしては別になりますが、下記dbt社のブログも情報としては参考になります。ストアドプロシージャのdbt移行を検討されている方は是非参考にしてみてください。
まとめ
という訳で、dbtにおけるSELECT文以外のDDL、DML、ストアドプロシージャ等の移行・置き換えに関する手順・指針の紹介でした。
『dbtの世界のルールに処理を置き換える』作業となるため、最初は慣れない部分も幾らか出てくることはあるかと思います。ですがdbtの流儀に則った形で置き換え作業を行っておくことで、後続の作業や分析等で十分過ぎる恩恵を得ることが出来るので徐々に慣れて行って頂けますと幸いです。一方で文中言及されていた『SELECT文以外のSQLをdbtで実行する方法』も無くは無いので、エントリを改める形でそれらの内容についても紹介出来ればとも思います。